# -*- coding: utf-8 -*-
from datetime import timedelta
from time import time
from utils.operators.rich_sql_sensor import RichSqlSensor
from jms.dm.dm_whole_effect_mail_push_dt import jms_dm__dm_whole_effect_mail_push_dt

label = f"dm_whole_effect_mail_push_dt_{int(time())}"
timeout = timedelta(hours=1).seconds
doris_jms_dm__dm_whole_effect_mail_push_dt = RichSqlSensor(
    task_id='doris_jms_dm__dm_whole_effect_mail_push_dt',
    pool='broker_load_pool',
    email=['suning@jtexpress.com','yl_bigdata@yl-scm.com'],
    task_concurrency=1,
    conn_id='doris',
    pre_sql=f"""
    TRUNCATE TABLE jms_dm.dm_whole_effect_mail_push_dt PARTITION (
         p{{{{ execution_date | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-1) | cst_ds_nodash }}}} 
        ,p{{{{ execution_date | date_add(-2) | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-3) | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-4) | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-5) | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-6) | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-7) | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-8) | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-9) | cst_ds_nodash }}}}
        ,p{{{{ execution_date | date_add(-10) | cst_ds_nodash }}}}
    );
    LOAD LABEL jms_dm.{label} (
        DATA INFILE(
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-10) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-9) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-8) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-7) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-6) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-5) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-4) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-3) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-2) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | date_add(-1) | cst_ds }}}}/*",
            "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_whole_effect_mail_push_dt/dt={{{{ execution_date | cst_ds }}}}/*")
        INTO TABLE dm_whole_effect_mail_push_dt
        FORMAT AS 'PARQUET'(
             waybill_no                        
            ,send_network_code                 
            ,send_network_name                 
            ,sign_network_code                 
            ,sign_network_name                 
            ,send_area_id                      
            ,send_area_desc                    
            ,send_city_id                      
            ,send_city_desc                    
            ,send_agent_code                   
            ,send_agent_name                   
            ,send_fran_code                    
            ,send_fran_name                    
            ,send_provider_id                  
            ,send_provider_desc                
            ,send_regional_id                  
            ,send_regional_desc                
            ,sign_area_id                      
            ,sign_area_desc                    
            ,sign_city_id                      
            ,sign_city_desc                    
            ,sign_agent_code                   
            ,sign_agent_name                   
            ,sign_fran_code                    
            ,sign_fran_name                    
            ,sign_provider_id                  
            ,sign_provider_desc                
            ,sign_regional_id                  
            ,sign_regional_desc                
            ,order_source_id                   
            ,ordersource_code                  
            ,ordersource_name                  
            ,deliver_later_user_code           
            ,deliver_later_user                
            ,pre_sign_taking_time              
            ,real_sign_taking_time             
            ,first_center_taking_time          
            ,inout_first_center_time           
            ,dest_first_center_time            
            ,last_first_center_time            
            ,inout_last_center_time            
            ,dest_last_center_time             
            ,pre_sign_dest_time                
            ,real_pre_sign_time                
            ,network_taking_send_time          
            ,network_send_nodal_arrival_time   
            ,first_nodal_arrival_send_time     
            ,nodal_send_center_arrival_time    
            ,nodal_arrival_center_send_time    
            ,end_nodal_arrival_send_time       
            ,nodal_send_network_arrival_time   
            ,end_network_arrival_deliver_time  
            ,deliver_aging_sign_time           
            ,actual_aging_sign_time            
            ,custorder_taking_time             
            ,custorder_pre_sign_time           
            ,pre_sign_taking_cnt               
            ,real_sign_taking_cnt              
            ,first_center_taking_cnt           
            ,inout_first_center_cnt            
            ,dest_first_center_cnt             
            ,last_first_center_cnt             
            ,inout_last_center_cnt             
            ,dest_last_center_cnt              
            ,pre_sign_dest_cnt                 
            ,real_pre_sign_cnt                 
            ,network_taking_send_cnt           
            ,network_send_nodal_arrival_cnt    
            ,first_nodal_arrival_send_cnt      
            ,nodal_send_center_arrival_cnt     
            ,nodal_arrival_center_send_cnt     
            ,end_nodal_arrival_send_cnt        
            ,nodal_send_network_arrival_cnt    
            ,end_network_arrival_deliver_cnt   
            ,deliver_aging_sign_cnt            
            ,actual_aging_sign_cnt             
            ,custorder_taking_cnt              
            ,custorder_pre_sign_cnt            
            ,if_reach                           
        )
        columns from path as (dt)
        set(
             waybill_no                           = waybill_no
            ,send_network_code                    = send_network_code
            ,send_network_name                    = send_network_name
            ,sign_network_code                    = sign_network_code
            ,sign_network_name                    = sign_network_name
            ,send_area_id                         = send_area_id
            ,send_area_desc                       = send_area_desc
            ,send_city_id                         = send_city_id
            ,send_city_desc                       = send_city_desc
            ,send_agent_code                      = send_agent_code
            ,send_agent_name                      = send_agent_name
            ,send_fran_code                       = send_fran_code
            ,send_fran_name                       = send_fran_name
            ,send_provider_id                     = send_provider_id
            ,send_provider_desc                   = send_provider_desc
            ,send_regional_id                     = send_regional_id
            ,send_regional_desc                   = send_regional_desc
            ,sign_area_id                         = sign_area_id
            ,sign_area_desc                       = sign_area_desc
            ,sign_city_id                         = sign_city_id
            ,sign_city_desc                       = sign_city_desc
            ,sign_agent_code                      = sign_agent_code
            ,sign_agent_name                      = sign_agent_name
            ,sign_fran_code                       = sign_fran_code
            ,sign_fran_name                       = sign_fran_name
            ,sign_provider_id                     = sign_provider_id
            ,sign_provider_desc                   = sign_provider_desc
            ,sign_regional_id                     = sign_regional_id
            ,sign_regional_desc                   = sign_regional_desc
            ,order_source_id                      = order_source_id
            ,ordersource_code                     = ordersource_code
            ,ordersource_name                     = ordersource_name
            ,deliver_later_user_code              = deliver_later_user_code
            ,deliver_later_user                   = deliver_later_user
            ,pre_sign_taking_time                 = pre_sign_taking_time
            ,real_sign_taking_time                = real_sign_taking_time
            ,first_center_taking_time             = first_center_taking_time
            ,inout_first_center_time              = inout_first_center_time
            ,dest_first_center_time               = dest_first_center_time
            ,last_first_center_time               = last_first_center_time
            ,inout_last_center_time               = inout_last_center_time
            ,dest_last_center_time                = dest_last_center_time
            ,pre_sign_dest_time                   = pre_sign_dest_time
            ,real_pre_sign_time                   = real_pre_sign_time
            ,network_taking_send_time             = network_taking_send_time
            ,network_send_nodal_arrival_time      = network_send_nodal_arrival_time
            ,first_nodal_arrival_send_time        = first_nodal_arrival_send_time
            ,nodal_send_center_arrival_time       = nodal_send_center_arrival_time
            ,nodal_arrival_center_send_time       = nodal_arrival_center_send_time
            ,end_nodal_arrival_send_time          = end_nodal_arrival_send_time
            ,nodal_send_network_arrival_time      = nodal_send_network_arrival_time
            ,end_network_arrival_deliver_time     = end_network_arrival_deliver_time
            ,deliver_aging_sign_time              = deliver_aging_sign_time
            ,actual_aging_sign_time               = actual_aging_sign_time
            ,custorder_taking_time                = custorder_taking_time
            ,custorder_pre_sign_time              = custorder_pre_sign_time
            ,pre_sign_taking_cnt                  = pre_sign_taking_cnt
            ,real_sign_taking_cnt                 = real_sign_taking_cnt
            ,first_center_taking_cnt              = first_center_taking_cnt
            ,inout_first_center_cnt               = inout_first_center_cnt
            ,dest_first_center_cnt                = dest_first_center_cnt
            ,last_first_center_cnt                = last_first_center_cnt
            ,inout_last_center_cnt                = inout_last_center_cnt
            ,dest_last_center_cnt                 = dest_last_center_cnt
            ,pre_sign_dest_cnt                    = pre_sign_dest_cnt
            ,real_pre_sign_cnt                    = real_pre_sign_cnt
            ,network_taking_send_cnt              = network_taking_send_cnt
            ,network_send_nodal_arrival_cnt       = network_send_nodal_arrival_cnt
            ,first_nodal_arrival_send_cnt         = first_nodal_arrival_send_cnt
            ,nodal_send_center_arrival_cnt        = nodal_send_center_arrival_cnt
            ,nodal_arrival_center_send_cnt        = nodal_arrival_center_send_cnt
            ,end_nodal_arrival_send_cnt           = end_nodal_arrival_send_cnt
            ,nodal_send_network_arrival_cnt       = nodal_send_network_arrival_cnt
            ,end_network_arrival_deliver_cnt      = end_network_arrival_deliver_cnt
            ,deliver_aging_sign_cnt               = deliver_aging_sign_cnt
            ,actual_aging_sign_cnt                = actual_aging_sign_cnt
            ,custorder_taking_cnt                 = custorder_taking_cnt
            ,custorder_pre_sign_cnt               = custorder_pre_sign_cnt
            ,if_reach                             = if_reach
            ,dt                                   = dt
        )
    )
    WITH BROKER '{{{{ var.json.doris_brokers | random_choice }}}}'
    PROPERTIES ('timeout'='{timeout}', 'max_filter_ratio'='0.0')""",
    poke_sql=f"SHOW LOAD FROM jms_dm WHERE label = '{label}' ORDER BY CreateTime DESC LIMIT 1",
    sql_on_kill=f"CANCEL LOAD FROM jms_dm WHERE LABEL = '{label}'",
    success=lambda r: r[2] == 'FINISHED',
    failure=lambda r: (r[2] is not None and r[2] == 'CANCELLED', str(r[7])),
    poke_interval=60,
    execution_timeout=timedelta(seconds=timeout + 120), )

doris_jms_dm__dm_whole_effect_mail_push_dt << jms_dm__dm_whole_effect_mail_push_dt
